package ws

import (
	"fmt"
	"time"

	"gitee.com/sansaniot/ssiot-core/logger"
	"github.com/gorilla/websocket"
)

const (
	writeWait = 10 * time.Second

	// 读下一条消息的超时时间
	pongWait = 60 * time.Second

	// 必须小于pongWait
	pingPeriod = (pongWait * 9) / 10

	// 读取消息大小上限 (如ssh协议透传字节数较多)
	maxMessageSize = 2048
)

var (
	newline = []byte{'\n'}
	space   = []byte{' '}
)

type Client struct {
	// 请求参数
	Param ClientParam
	// 权限
	Auth ClientAuth

	// client连接时取数据
	ClientConnCall func(c *Client) [][]byte
	// 接收client消息
	ClientMsgRecv func(c *Client, message []byte)
	// 广播消息权限过滤
	ClientBroadFilter func(c *Client, item string) bool
	// client断开时回调
	ClientUnRegCall func(param ClientParam)

	// 客户端消息 集线器
	hub *Hub
	// conn对象
	conn *websocket.Conn
	// 写消息
	send chan []byte
}

type ClientParam struct {
	Sn           string                 `json:"sn"`
	DeptId       string                 `json:"deptId"`
	ProductModel string                 `json:"productModel"`
	Key          string                 `json:"key"`
	Attr         map[string]interface{} `json:"attr"`
}

func (p ClientParam) toString() string {
	return fmt.Sprintf("sn=%s deptId=%s productModel=%s key=%s attr=%v", p.Sn, p.DeptId, p.ProductModel, p.Key, p.Attr)
}

type ClientAuth struct {
	UserId     string
	AgencyId   string
	DeptIdList []string
	//SnList           []string
	//ProductModelList []string
}

func (c *Client) toString() string {
	return fmt.Sprintf("[wsclient:%s params:%s]", c.conn.RemoteAddr().String(), c.Param.toString())
}

func (c *Client) writePump() {
	ticker := time.NewTicker(pingPeriod)
	defer func() {
		ticker.Stop()
		_ = c.conn.Close()
	}()
	for {
		select {
		case message, ok := <-c.send:
			_ = c.conn.SetWriteDeadline(time.Now().Add(writeWait))
			if !ok {
				// 集线器关闭了conn
				_ = c.conn.WriteMessage(websocket.CloseMessage, []byte{})
				return
			}
			w, err := c.conn.NextWriter(websocket.TextMessage)
			if err != nil {
				return
			}
			// 业务处理
			if len(message) == 0 {
				continue
			}
			// 通信客户端
			_, _ = w.Write(message)
			logger.Debugf("推送[%s]消息[%s]", c.GetRemoteAddr(), string(message))
			if err = w.Close(); err != nil {
				return
			}
		case <-ticker.C:
			// 定时ping
			_ = c.conn.SetWriteDeadline(time.Now().Add(writeWait))
			if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
				return
			}
		}
	}
}

func (c *Client) readPump() {
	defer func() {
		c.hub.unregister <- c
		_ = c.conn.Close()
	}()
	c.conn.SetReadLimit(maxMessageSize)
	_ = c.conn.SetReadDeadline(time.Now().Add(pongWait))
	c.conn.SetPongHandler(func(string) error {
		// pong正常
		_ = c.conn.SetReadDeadline(time.Now().Add(pongWait))
		return nil
	})
	for {
		_, message, err := c.conn.ReadMessage()
		if err != nil {
			if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
				logger.Error(err)
			}
			break
		}
		// 客户端消息原封不变(如透传ssh协议末尾就存在换行符)
		//message = bytes.TrimSpace(bytes.Replace(message, newline, space, -1))
		logger.Debugf("收到[%s]消息[%s]", c.GetRemoteAddr(), string(message))
		// 处理请求
		if c.ClientMsgRecv != nil {
			c.ClientMsgRecv(c, message)
		}
	}
}

func (c *Client) Client2Hub() {
	runHub()
	c.hub = hub
	c.send = make(chan []byte, 256)

	hub.register <- c

	go c.writePump()
	go c.readPump()
}

func (c *Client) SetConn(conn *websocket.Conn) {
	c.conn = conn
}

func (c *Client) GetRemoteAddr() string {
	return c.conn.RemoteAddr().String()
}

func (c *Client) SendMessage(sendMsg []byte) {
	go func() {
		select {
		case c.send <- sendMsg:
		default:
			close(c.send)
			if hub != nil {
				hub.clients.Delete(c)
			}
		}
	}()
}
